package com.zang.test.rocketmq.mq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author Zhang Qiang
 * @date 2019/10/31 9:54
 */
@Slf4j
@Component
public class Consumer {

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String pushConsumer;

    @PostConstruct
    private void defaultMqConsumer(){
        DefaultMQPushConsumer defaultMqPushConsumer = new DefaultMQPushConsumer(pushConsumer);
        defaultMqPushConsumer.setNamesrvAddr(namesrvAddr);
        try {
            defaultMqPushConsumer.subscribe("PushTopic", "push");
            defaultMqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            defaultMqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt ext : list) {
                        try {
                            String msgBody = new String(ext.getBody(), "utf-8");
                            log.info(" {} 响应消息： msgId：{}， msgBody: {}", ext.getMsgId(), ext.getMsgId(), msgBody );
                            TimeUnit.SECONDS.sleep(1);
                        } catch (Exception e) {
                            log.error(" 消费异常，请稍后再试 ");
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            defaultMqPushConsumer.start();
        } catch (Exception e) {
            log.error(" 消费异常: {} ", e.getMessage());
        }

    }


}
